{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 4.用`threading`和`concurrent.futures`模块\n",
    "在上一章，我们总结了并行思想可以解决的一些问题。这一章，我们将用Python的`threading`模块实现每个问题的解决方案。\n",
    "\n",
    "本章内容包括以下主题：\n",
    "- 线程的定义\n",
    "- `threading`和`_thread`的选择\n",
    "- 用`threading`实现多请求的Fibonacci数列\n",
    "- 用`concurrent.futures`模块实现网络爬虫\n",
    "\n",
    "<!-- TEASER_END-->"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 线程的定义\n",
    "线程（thread）是一个进程（process）中不同执行单元（execution line）。如果把一个程序比喻成蜂巢（hive），那么向蜂巢里收集花粉（pollen）这件事就是一个进程。这个收集花粉的进程是由若干只为了解决花粉不足问题的工蜂（worker bee）同时工作来完成。这些工蜂扮演的角色就是线程，在进程内部执行并共享资源以完成各自的任务。\n",
    "\n",
    "线程属于同一个进程并共享同一块内存。因此，开发者的任务就是控制和使用这块内存。\n",
    "\n",
    "### 使用线程的优缺点\n",
    "使用线程之前，需要考虑线程具有的一些优缺点，而且线程与具体的编程语言和操作系统有密切关联。\n",
    "\n",
    "线程的优点主要有：\n",
    "- 线程在同一个进程内的通信速度，数据定位速度，和信息共享速度都是非常快的\n",
    "- 创建线程的成本比创建进程低，因为它不需要复制主进程上下文所包含的全部信息\n",
    "- 通过处理器缓存优化内存接入速度可以高效使用数据局部存储（data locality，thread local storage，TLS）\n",
    "\n",
    "线程的缺点主要有：\n",
    "- 数据共享允许快速通信。但是，有时会让经验不足的开发者犯错。\n",
    "- 数据共享限制了解决方案的灵活性。比如，迁移到分布性架构时，线程是十分令人头疼的事情。通常，线程会限制算法的扩展性。\n",
    "\n",
    ">在Python程序语言里，由于GIL的存在，使用带有CPU限制的线程可能影响程序的性能。\n",
    "\n",
    "### 线程类型\n",
    "用两种线程：内核线程（kernel thread）与用户线程（user thread）。内核线程是由操作系统创建和管理的线程。线程上下文的切换（Context Switch），调度，和结束都是通过操作系统的内核管理的。而用户线程的所有状态是由开发者控制的。\n",
    "\n",
    "我们可以对比两者的优缺点。\n",
    "\n",
    "内核线程的优点主要有：\n",
    "- 一个内核线程常只轻量进程（lightweight processes）。因此如果一个内核线程阻塞，其他内核线程还可以运行\n",
    "- 内核线程可以在不同的CPU上运行\n",
    "\n",
    "内核线程的缺点主要有：\n",
    "- 创建与同步操作开销都较大\n",
    "- 实现方式与平台有关\n",
    "\n",
    "用户线程的优点主要有：\n",
    "- 创建与同步操作开销都较小\n",
    "- 实现方式与平台无关\n",
    "\n",
    "用户线程的缺点主要有：\n",
    "- 一个进程内的所有用户线程只能与一个内核线程关联。因此，如果一个用户线程阻塞，其他用户线程都不能执行\n",
    "- 用户线程不运行在不同的CPU上\n",
    "\n",
    "### 线程状态定义\n",
    "线程生命周期中一共五个状态，如下所示：\n",
    "- **创建**：主进程创建一个线程，创建之后，会被发送到线程队列准备执行\n",
    "- **执行**：这个阶段，线程使用CPU\n",
    "- **就绪**：这个阶段，线程在线程队列里准备就绪即将执行\n",
    "- **阻塞**：这个阶段，线程被阻塞，可能是等待I/O操作完成，这个阶段不使用CPU\n",
    "- **结束**：这个阶段，释放执行时占用的资源，结束线程的生命周期\n",
    "\n",
    "### `threading`和`_thread`的选择\n",
    "Python标准库里提供了两个线程模块：`_thread`模块（这个Python模块提供了一个线程的底层API，具体文档见*`https://docs.python.org/3.5/library/_thread.html#module-_thread`*）和`threading`模块（这个Python模块提供了一个线程的高层API，具体文档见*`https://docs.python.org/3.5/library/threading.html#module-threading`*）。这个`threading`模块通过了比`_thread`模块更友好的接口，使用起来更方便。开发者根据自己口味选择。如果开发者觉得用线程的底层API更舒服，可以实现自己的线程池（thread pool），互斥锁和其他原始特征，那么他可以选择`_thread`模块。否则，还是用`threading`模块最方便。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 用`threading`实现多请求的Fibonacci数列\n",
    "\n",
    "现在是时候回到现实。我们的任务是在Fibonacci数列有多个输入请求时并行的执行。处于教学的目的，我们把输入值设置成四个，每个输入值分配一个线程运行函数，让worker与要执行的任务（task）完美地一一对应。算法如下所示：\n",
    "\n",
    "1. 首先，用列表储存需要计算的四个值，然后把值发送到一个允许线程同步使用的结构中。\n",
    "2. 当等待计算的值被发送到可同步使用的结构中之后，计算Fibonacci数列的线程需要一个信号告诉它们数据已经准备就绪了。我们用一个叫`Condition`的线程同步机制（Thread synchronization mechanism）。（`Condition`机制是一个Python对象，用来提供多个线程之间数据使用的同步机制，具体文档查看*`https://docs.python.org/3/library/threading.html#condition-objects`*。）\n",
    "3. 当每个线程完成对应Fibonacci数列的计算之后，把结果保存到词典中。\n",
    "\n",
    "那么，现在让我们演示最有趣的代码和注释内容。\n",
    "\n",
    "代码开始的时候，我们使用Unicode编码格式，然后导入`logging`，`threading`和`Queue`模块。另外，我们在例子中已经定义了主要的数据结构。我们定义了一个字典`fibo_dict`，把每个整数（作为用户输入）存储为键，对应Fibonacci数列的计算作为字典的值。我们还用`queue`模块中的`Queue`模块声明了一个`Queue`对象`shared_queue`，用来存储计算Fibonacci数列线程之间的共享数据，线程把数据插入`Queue`对象。最后，我们定义最后一个数据结构——一个带四个元素的Python的`list`对象，模拟程序即将接收到的值。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "#coding: utf-8\n",
    "import logging, threading\n",
    "\n",
    "from queue import Queue\n",
    "\n",
    "logger = logging.getLogger()\n",
    "logger.setLevel(logging.DEBUG)\n",
    "formatter = logging.Formatter('%(asctime)s - %(message)s')\n",
    "\n",
    "ch = logging.StreamHandler()\n",
    "ch.setLevel(logging.DEBUG)\n",
    "ch.setFormatter(formatter)\n",
    "logger.addHandler(ch)\n",
    "\n",
    "fibo_dict = {}\n",
    "shared_queue = Queue()\n",
    "input_list = [3, 10, 5, 7]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">**下载源代码**\n",
    "\n",
    ">你可以在Packt网站*`https://www.packtpub.com/`*，用你的账号下载所有书籍的示例代码。如果你从其他地方买的书，你可以先用*`https://www.packtpub.com/books/content/support`*搜索书籍名称，然后用你的邮箱地址注册一个账号下载代码。\n",
    "\n",
    "下面一行代码，我们将定义一个`threading`模块里的`Condition`对象。这个对象可以通过一个具体的条件来同步资源使用的状态。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "queue_condition = threading.Condition()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "使用`Condition`对象是为了控制队列的创建，以及里面发生的过程。\n",
    "\n",
    "代码的另一个部分是定义线程即将执行的函数`fibonacci_task`。`fibonacci_task`函数的参数是`condition`对象，用来控制`fibonacci_task`使用`shared_queue`。我们使用`with`语句（关于`with`语句更多的信息，请参考*`https://docs.python.org/3/reference/compound_stmts.html#the-with-statement`*）来简化上下文的形式。如果不使用`with`语句，我们就需要显式地对线程锁进行获取和释放。使用`with`语句，我们就可以在程序一开始就获取线程锁，然后在程序结束的时候自动退出线程锁。在`fibonacci_task`函数的`with`语句后面是一个逻辑表达式，告诉当前的线程，“当`shared_queue`为空时，一直等待”。等待就是用`condition`对象的`wait()`方法是实现的。线程会等待，直到它被告知`shared_queue`可以使用时才停止。一旦条件得到满足，就会立即计算Fibonacci数列的值并存储到字典`fibo_dict`中。最后，我们调用`task_done()`方法，确认某一个排队的任务已经被抽取出来执行了。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def fibonacci_task(condition):\n",
    "    with condition:\n",
    "        while shared_queue.empty():\n",
    "            logger.info(\"[%s] - waiting for elements in queue..\"\n",
    "                % threading.current_thread().name)\n",
    "            condition.wait()\n",
    "        else:\n",
    "            value = shared_queue.get()\n",
    "            a, b = 0, 1\n",
    "            for item in range(value):\n",
    "                a, b = b, a + b\n",
    "                fibo_dict[value] = a\n",
    "        shared_queue.task_done()\n",
    "        logger.debug(\"[%s] fibonacci of key [%d] with result [%d]\" %\n",
    "            (threading.current_thread().name, value, fibo_dict[value]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们定义的第二个函数是`queue_task`是由线程执行的，把线程计算的结果添加到`shared_queue`里。我们会发现函数把`condition`作为线程同步接入`shared_queue`队列的参数。线程会把`input_list`中的每个元素都存储到`shared_queue`队列中。\n",
    "\n",
    "当元素都存储到`shared_queue`队列之后，函数会告诉线程负责计算Fibonacci数列的队列已经准备好了。这个函数用`condition.notifyAll()`完成，如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def queue_task(condition):\n",
    "    logging.debug('Starting queue_task...')\n",
    "    with condition:\n",
    "        for item in input_list:\n",
    "            shared_queue.put(item)\n",
    "        logging.debug(\"Notifying fibonacci_task threads that the queue is ready to consume..\")\n",
    "        condition.notifyAll()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在后面的代码里，我们创建一个`shared_queue`准备条件已经满足的四线程组。然后我们使用一个允许自定义函数的`thread`类构造器（constructor），这个线程将用`target`参数设置要执行函数，在`args`里面放入要执行函数的参数，如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "threads = [threading.Thread(daemon=True, target=fibonacci_task,\n",
    "                            args=(queue_condition,)) for i in range(4)]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后，我们开启准备计算Fibonacci数列的线程。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "[thread.start() for thread in threads]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "紧接着，我们再创建并执行一个控制`shared_queue`的线程。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "prod = threading.Thread(name='queue_task_thread', daemon=True, \n",
    "                        target=queue_task, args=(queue_condition,))\n",
    "prod.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "最后，我们对所有线程调用`join()`方法来计算Fibonacci数列。这么做的目的是让`main`线程等待这些线程全部执行完Fibonacci数列，这样它就不会在其他线程还没有结束之前就停止。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "[thread.join() for thread in threads]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "程序的执行结果如下图所示：\n",
    "![](http://muxuezi.github.io/posts/ppp/ch4/threading_fibonacci.png)\n",
    "可以看出，一开始`fibonacci_task`线程被创建并初始化，紧接着进入等待状态。同时，`queue_task`被创建，然后生成`shared_queue`。最后，`queue_task`告诉`fibonacci_task`线程可以执行任务了。\n",
    "\n",
    "还会看到，`fibonacci_task`线程执行并不是自然顺序，每次运行线程执行顺序都会变化。这就是线程的特点：不确定性。\n",
    "\n",
    "[源代码](http://muxuezi.github.io/posts/ppp/ch4/parallel_fibonacci.py)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## 用`concurrent.futures`模块并行抓取网页\n",
    "下面我们将用代码来实现一个并行网络爬虫。这里我们会用一个非常有意思的Python功能，`concurrent.futures`模块中的`ThreadPoolExecutor`。在前面的例子中，我们用非常原始的线程功能实现了`parallel_fibonacci.py`。而且我们是通过手工操作创建和初始化多个线程。在线程较多的项目中，这些做很难管理。通常，都会使用线程池解决线程的状态管理问题。线程池是一种用来保存若干线程的结构，在进程里使用之前创建。其目的是重用线程，这样可以避免重复创建线程——降低资源消耗。\n",
    "\n",
    "下面的内容基本和上一节一样，我们需要一个通过多个阶段来执行不同任务的算法，这些任务是彼此相关的。我们将演示并行网络爬虫的代码。\n",
    "\n",
    "导入模块并设置日志文件之后，我们用Python的[`re`模块](https://docs.python.org/3/library/re.html#module-re)新建一个正则表达式，我们将在抓取阶段用这个表达式从网页源代码中筛选出所有链接。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "html_link_regex = re.compile('<a\\s(?:.*?\\s)*?href=[\\'\"](.*?)[\\'\"].*?>')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "紧接着，我们用一个同步队列来模拟多个输入URL任务。然后，新建一个字典`result_dict`来存储结果，把URL和对应页面的链接列表存在字典中。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "urls = queue.Queue()\n",
    "urls.put('http://www.sina.com')\n",
    "urls.put('http://cn.bing.com/')\n",
    "urls.put('https://coding.net/')\n",
    "urls.put('http://github.com/')\n",
    "urls.put('http://mail.126.com/')\n",
    "\n",
    "result_dict = {}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "之后我们定义一个函数`group_urls_task`，从同步队列中抽取URL加入到字典`result_dict`里，URL作为字典`result_dict`的键。还有一个细节要注意是同步队列的`get()`函数用了两个参数。一个参数是`True`用来阻塞同步队列接入。另一个参数是`0.05`秒，避免在同步队列中没有任务时等待太长时间。有时候，你不会愿意花太多时间阻塞线程来等待任务出现。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def group_urls_task(urls):\n",
    "    try:\n",
    "        url = urls.get(True, 0.05)\n",
    "        result_dict[url] = None\n",
    "        logger.info(\"[%s] putting url [%s] in dictionary...\" % (\n",
    "            threading.current_thread().name, url))\n",
    "    except queue.Empty:\n",
    "        logging.error('Nothing to be done, queue is empty')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "现在，我们用函数`crawl_task`实现每个URL的页面的抓取。基本上抓取阶段就是获取URL对应页面上的所有链接。函数返回值是一个元组，第一个元素是URL，第二个元素是URL对应页面上的所有链接构成的列表。这里我们用[`requests`模块](http://docs.python-requests.org/en/latest/)获取URL对应页面的内容。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def crawl_task(url):\n",
    "    links = []\n",
    "    try:\n",
    "        request_data = requests.get(url)\n",
    "        logger.info(\"[%s] crawling url [%s] ...\" % (\n",
    "            threading.current_thread().name, url))\n",
    "        links = html_link_regex.findall(request_data.text)\n",
    "    except:\n",
    "        logger.error(sys.exc_info()[0])\n",
    "        raise\n",
    "    finally:\n",
    "        return (url, links)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "进一步分析代码，我们会看到里面创建了一个`concurrent.futures`模块中的[`ThreadPoolExecutor`对象](https://docs.python.org/3.5/library/concurrent.futures.html)。在`ThreadPoolExecutor`对象的构造器里，设置了`max_workers`参数。这个参数在线程池里定义了供执行器（executor）使用的线程数量。在前面将URL从同步队列移除并作为键增加到字典`result_dict`这个阶段，都是用这个三个worker线程完成的。这个数量可以根据问题的需要进行设置。完成`ThreadPoolExecutor`对象的定义之后，我们用`with`语句保证线程的结束例程（ending routine），这些例程将会在`with`语句范围结束后立即执行。在`ThreadPoolExecutor`对象的范围内，我们在同步队列中重复它，以`submit`方法对同步队列中的URL执行一个引用。总之，`submit`方法为执行安排了一个可调用函数，然后返回一个带有创建执行的例程安排（scheduling）的`Future`对象。`submit`方法接收一个可调用函数和函数的参数；在我们的例子里，可调用函数就是`group_urls_task`，函数的参数就是同步队列`urls`。当这些参数被调用之后，线程池中的worker线程会以一种并行，异步的方式执行函数预订（booking）。代码如下所示："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "with concurrent.futures.ThreadPoolExecutor(max_workers=3) as group_link_threads:\n",
    "    for i in range(urls.qsize()):\n",
    "        group_link_threads.submit(group_urls_task, urls)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "之后我们又定义了一个新的`ThreadPoolExecutor`对象；但是这一次我们想用前面`group_urls_task`函数生成的字典`result_dict`的键来执行抓取阶段的任务。代码会有些不同："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "with concurrent.futures.ThreadPoolExecutor(max_workers=3) as crawler_link_threads:\n",
    "    future_tasks = {crawler_link_threads.submit(crawl_task, url): url for url in result_dict.keys()}\n",
    "    for future in concurrent.futures.as_completed(future_tasks):\n",
    "        result_dict[future.result()[0]] = future.result()[1]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们映射了一个临时字典`future_tasks`。里面包含了`submit`方法进行的函数预订，参数是`result_dict`的每个URL键。也就是说，我们用`future_tasks`为每个键创建了一个入口（entry）。映射之后，我们需通过`concurrent.futures.as_completed(fs, timeout=None)`方法来寻找`future_tasks`里入口，通过一个循环来获取前面函数预订的结果。这个调用会返回一个由`Future`对象实例组成的迭代器（iterator）。因此，我们可以迭代出每一个已经被处理好的预订结果。在`ThreadPoolExecutor`对象的最后，我们用`Future`对象`result()`方法获取抓取线程的结果。在这个例子的抓取阶段，`Future`对象返回的结果是元组。用这种方式我们获得的`future_tasks`最终结果如下图所示：\n",
    "![](http://muxuezi.github.io/posts/ppp/ch4/threading_webcrawler.png)\n",
    "我们再一次看到线程池中的线程出现的顺序并不是自然顺序，这是由于线程具有不确定性。重要的是通过打印`result_dict`的项目来展示最终结果。\n",
    "\n",
    "[源代码](http://muxuezi.github.io/posts/ppp/ch4/parallel_web_crawler.py)\n",
    "\n",
    "## 本章小结\n",
    "在这一章里，我们重点介绍了线程的概念。我们通过`threading`和`concurrent.futures`模块实现了上一章里提到的两个问题。通过问题的解决展示了两个模块的原理和灵活性。\n",
    "\n",
    "下一章，我们将重点介绍如何用`multiprocessing`和`ProcessPoolExecutor`解决这两个问题。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
